
In this notebook we finally do our larger area. We're going to need some better visualisation tools and it would be great not to bring the results back to the Jupyter notebook but to leverage the dask clusters resources during visualisation. We'll be using some dask-aware visualiation libraries (holoviews and datashader) to do the heavy lifting.
Let's begin by starting up our cluster and sizing it appropriately to our computational task.
All the code here is the same as the conclusion from the previous notebook, except we'll make the cluster bigger with 10 workers instead of 4. We'll also make the masking and NDVI calculation into a python function since we won't be making any changes to that now.
We'll use the same ROI and time period for this run and we're using all the techniques so far to reduce the computation time:
# Initialize the Gateway client
from dask.distributed import Client
from dask_gateway import Gateway
number_of_workers = 10
gateway = Gateway()
clusters = gateway.list_clusters()
if not clusters:
print('Creating new cluster. Please wait for this to finish.')
cluster = gateway.new_cluster()
else:
print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
cluster=gateway.connect(clusters[0].name)
cluster.scale(number_of_workers)
client = cluster.get_client()
client
An existing cluster was found. Connecting to: easihub.b444f7be6e5f4b9888259e752ac0d2b1
Client-ad70b3b6-064b-11ee-82af-8e60251f35b2
| Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
| Dashboard: https://hub.adias.aquawatchaus.space/services/dask-gateway/clusters/easihub.b444f7be6e5f4b9888259e752ac0d2b1/status |
import pyproj
pyproj.set_use_global_context(True)
import git
import sys, os
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dask.distributed import Client, LocalCluster
import datacube
from datacube.utils import masking
from datacube.utils.aws import configure_s3_access
# EASI defaults
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "adias"
dc = datacube.Datacube()
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client)
<botocore.credentials.Credentials at 0x7f6d148e0940>
# Get the centroid of the coordinates of the default extents
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2
# central_lat = -42.019
# central_lon = 146.615
# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 0.8
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
# Data product
products = easi.product('landsat')
# Set the date range to load data over
set_time = easi.time
set_time = (set_time[0], parse(set_time[0]) + relativedelta(years=1))
# set_time = ("2021-01-01", "2021-12-31")
# Selected measurement names (used in this notebook)
alias = easi.aliases('landsat')
measurements = [alias[x] for x in ['qa_band', 'red', 'nir']]
# Set the QA band name and mask values
qa_band = alias['qa_band']
qa_mask = easi.qa_mask('landsat')
# Set the resampling method for the bands
resampling = {qa_band: "nearest", "*": "average"}
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
# Set the scene group_by method
group_by = "solar_day"
def masked_seasonal_ndvi(dataset):
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[alias['nir']] - cloud_free[alias['red']]
band_sum = cloud_free[alias['nir']] + cloud_free[alias['red']]
# Calculate NDVI
ndvi = None
ndvi = band_diff / band_sum
return ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":3072},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
print(f"ndvi_unweighted size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 37.06 ndvi_unweighted size (GiB) 2.86
client.wait_for_workers(n_workers=10)
cluster
VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n …
%%time
actual_result = ndvi_unweighted.compute()
CPU times: user 4.34 s, sys: 1.88 s, total: 6.22 s Wall time: 3min 50s
You'll notice the computation time is slightly faster with more workers - we're IO bound so more workers means more available IO bandwidth and threads. It's not 2-3 x faster though - we're wasting a lot of resources because we can't actually use all of that extra power.
Tip: More isn't always better. Be mindful of your computational resource usage and cost. This size cluster is a tremendous waste for this size computational job. Size things appropriately.
And visualise the result
actual_result.sel(season='DJF').plot()
<matplotlib.collections.QuadMesh at 0x7f6d143c78e0>
We'll save the coordinates of this section from the array (as slices) so we can use them later for visualising the same ROI from a larger dataset.
x_slice = slice(ndvi_unweighted.x[0], ndvi_unweighted.x[-1])
y_slice = slice(ndvi_unweighted.y[0], ndvi_unweighted.y[-1])
Let's change the area extent to about 4 degrees square.
# Compute the bounding box for the study area
buffer = 2
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
# Check the map below to see if you are including the area that you want. For this example, it would be best to not include too much water.
from dea_tools.plotting import display_map
display_map(study_area_lon, study_area_lat)
/env/lib/python3.10/site-packages/dea_tools/plotting.py:313: FutureWarning: This function is deprecated. See: https://pyproj4.github.io/pyproj/stable/gotchas.html#upgrading-to-pyproj-2-from-pyproj-1
all_longitude, all_latitude = transform(Proj(crs), Proj('EPSG:4326'), all_x,
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":3072},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
Before we compute anything let's take a look at our result's shape and size
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
print(f"ndvi_unweighted size (GiB) {ndvi_unweighted.nbytes / 2**30:.2f}")
dataset size (GiB) 375.76 ndvi_unweighted size (GiB) 17.89
This is now much bigger!
The result is getting on the large size for the notebook node so we will need to pay attention to data locality and the size of results being processed. The cluster has a LOT more memory than the notebook node; bring too much back to the notebook and the notebook will crash.
Tip: Be mindful of the size of the results and their data locality.
Now let's check the shape, tasks and chunks
dataset
<xarray.Dataset>
Dimensions: (time: 112, y: 24512, x: 24494)
Coordinates:
* time (time) datetime64[ns] 2022-02-04T18:45:01.040642 ... 2023-01...
* y (y) float64 1.413e+07 1.413e+07 ... 1.339e+07 1.339e+07
* x (x) float64 5.1e+06 5.1e+06 5.1e+06 ... 5.835e+06 5.835e+06
spatial_ref int32 32650
Data variables:
qa_pixel (time, y, x) uint16 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
red (time, y, x) uint16 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
nir08 (time, y, x) uint16 dask.array<chunksize=(2, 3072, 3072), meta=np.ndarray>
Attributes:
crs: epsg:32650
grid_mapping: spatial_refLooking at the red data variable we can see about 50 GiB for the array, 36 MiB per chunk and 5526 tasks. Noting the nir and qa_band will be similarly shaped and size.
The number of tasks is climbing so we can expect an increase in task graph optimisation time.
Chunk size and tasks seems okay, but we will monitor the dask dashboard in case there are issues with temporaries causing workers to spill to disk if memory is too full.
The chunking is resulting in some slivers, particularly on the y axis. Let's modify the y chunk size so these slivers don't exist as its blowing out the tasks and is likely unnecessary. We will calculate the x and y chunk sizes below to get a nice fit. Make sure to check the chunk size afterwards to make sure it doesn't get too large. If it does we can make the chunks smaller to reduce slivers too.
from math import ceil
y_chunks = ceil(dataset.dims['y']/5)
y_chunks
4903
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":y_chunks},
group_by=group_by,
)
ndvi_unweighted = masked_seasonal_ndvi(dataset)
Now recheck our chunk size and tasks for red
dataset
<xarray.Dataset>
Dimensions: (time: 112, y: 24512, x: 24494)
Coordinates:
* time (time) datetime64[ns] 2022-02-04T18:45:01.040642 ... 2023-01...
* y (y) float64 1.413e+07 1.413e+07 ... 1.339e+07 1.339e+07
* x (x) float64 5.1e+06 5.1e+06 5.1e+06 ... 5.835e+06 5.835e+06
spatial_ref int32 32650
Data variables:
qa_pixel (time, y, x) uint16 dask.array<chunksize=(2, 4903, 3072), meta=np.ndarray>
red (time, y, x) uint16 dask.array<chunksize=(2, 4903, 3072), meta=np.ndarray>
nir08 (time, y, x) uint16 dask.array<chunksize=(2, 4903, 3072), meta=np.ndarray>
Attributes:
crs: epsg:32650
grid_mapping: spatial_refVery marginal increase in memory per chunk but the tasks have dropped from 5526 to 4661. Note that this occurs for every measurement and operation so the benefit is significant.
ndvi_unweighted
<xarray.DataArray (season: 4, y: 24512, x: 24494)>
dask.array<stack, shape=(4, 24512, 24494), dtype=float64, chunksize=(1, 4903, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 1.413e+07 1.413e+07 ... 1.339e+07 1.339e+07
* x (x) float64 5.1e+06 5.1e+06 5.1e+06 ... 5.835e+06 5.835e+06
spatial_ref int32 32650
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'Total task count is sub 100_000 so should be okay but task graph optimisation will take a while. Resulting array is a bit big for the notebook node as stated previously.
The shape spatially is y:15686, x:13707. Standard plots aren't going to work very well for visualising the result in the notebook and the result uses a fair amount of memory so we'll need a different approach.
For now let's visualize the same ROI as the small area before. We stashed that ROI in x_slice, y_slice.
If you haven't already, open the dask dashboard so you can watch the cluster make progress
The code to do this visualisation is basically the same as before except now we specify a slice
%%time
ndvi_unweighted.sel(season='DJF', x=x_slice, y=y_slice).compute().plot(robust=True, size=6, aspect='equal')
CPU times: user 9.6 s, sys: 1.27 s, total: 10.9 s Wall time: 3min 17s
<matplotlib.collections.QuadMesh at 0x7f6d03525bd0>
The computation time is relatively short since we are only materialising the result for a subset of the overall dataset.
To visualise all of the data we will make use of the dask cluster and some dask-aware visualisation capabilties from holoviews and datashader python libraries. These libraries provide an interactive visualisation capability that leaves the large datasets on the cluster and transmits only the final visualisation to the Jupyter notebook. This is done on the fly so the user can zoom and pan about the dataset in all dimensions and the dask cluster will scale data to fit in the viewport automatically. Details of how this is done and advanced features available is beyond the scope of this dask and ODC course but the manuals are extensive and the basic example here both powerful and useful.
Tip: The datashader pipeline page provides an excellent summary of what's going on.
compute() and persist()¶The first thing we will do is persist() the results of our calculation to the cluster. This will materialise the results but will keep the result on the cluster (so all lazy tasks are calculated, just like compute() but data locality remains on the cluster). This will ensure the result is readily available for the visualisation. The cluster has plenty of (distributed) memory so there is no reason not to materialise the result on the cluster.
persist() is non-blocking so will return just as soon as task graph optimisation (which is performed in the notebook kernel) is complete. Run the next cell and you will see it takes a few seconds to do task graph optimisation, and once that is complete the Jupyter notebook will be available for use again. At the same time the dask dashboard will show tasks running as the result is computed and left on the cluster.
%%time
on_cluster_result = ndvi_unweighted.persist()
# wait(on_cluster_result)
on_cluster_result
CPU times: user 1.78 s, sys: 44.1 ms, total: 1.83 s Wall time: 1.8 s
<xarray.DataArray (season: 4, y: 24512, x: 24494)>
dask.array<stack, shape=(4, 24512, 24494), dtype=float64, chunksize=(1, 4903, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 1.413e+07 1.413e+07 ... 1.339e+07 1.339e+07
* x (x) float64 5.1e+06 5.1e+06 5.1e+06 ... 5.835e+06 5.835e+06
spatial_ref int32 32650
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'The on_cluster_result will continue to show as a dask array on the cluster - not actual results. Think of it as a handle that links the Jupyter client to the result on the dask cluster.
The cluster will start the computation, but we can continue working in the notebook. Let's import some new visualization libraries: holoviews and datashader. We'll be using datashader.rasterize to handle the scaling of the full dataset which has many more pixels than what what is being displayed in the notebook.
Notice also there are no bounds set on the dataset, we are viewing the entire result, including the season dimension. holoviews will provide an interface for pan, zoom and time (season) selection and you can use the mouse to move around the data.
Tip: Keep watching your Dask dashboard to see how the calculations are progressing.
import holoviews as hv
import xarray as xr
from holoviews import opts
from holoviews.operation.datashader import rasterize
hv.extension("bokeh", width=800)
holoviews expects the xarray.DataArray to have a name so let's give it one via the name attribute
on_cluster_result.name = "ndvi"
on_cluster_result
<xarray.DataArray 'ndvi' (season: 4, y: 24512, x: 24494)>
dask.array<stack, shape=(4, 24512, 24494), dtype=float64, chunksize=(1, 4903, 3072), chunktype=numpy.ndarray>
Coordinates:
* y (y) float64 1.413e+07 1.413e+07 ... 1.339e+07 1.339e+07
* x (x) float64 5.1e+06 5.1e+06 5.1e+06 ... 5.835e+06 5.835e+06
spatial_ref int32 32650
* season (season) object 'DJF' 'JJA' 'MAM' 'SON'aspect = on_cluster_result.sizes['y']/on_cluster_result.sizes['x']
width = 700
height = int(width*aspect)
ndvi_seasonal_mean_ds = on_cluster_result.to_dataset(
name="ndvi_seasonal_mean"
) # holoviews works better with datasets so let's convert the xarray DataArray holding ndvi into a Dataset
The next cell will display the result - when its ready. The rasterize function will calculate a representative pixel for display from the full array on the dask cluster. If you monitor the dashboard you will see small bursts of activity across the workers and quite some waiting whilst data transfers occur to bring all the summary information back and transmit it to the Jupyter notebook.
You can use the controls on the right to pan and zoom around the full image. If you zoom in, rasterize will take a moment to generate a new summary for the current zoom level and show more or less detail. Similarly for panning.
hv_ds = hv.Dataset(ndvi_seasonal_mean_ds)
rasterize(
hv_ds.to(hv.Image, ["x", "y"], "ndvi_seasonal_mean", ["season"]).opts(
title="NDVI Seasonal Mean",
cmap="RdYlGn", # NDVI more green the larger the value.
clim=(-0.3, 0.8), # we'll clamp the range for visualisation to enhance the visualisation
colorbar=True,
frame_width=width,
frame_height=height
)
, precompute=True)
Disconnecting your client is good practice, but the cluster will still be up so we need to shut it down as well
client.close()
cluster.shutdown()